{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Convert continuous variables to categorical variables"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "There are two functions we can use to split a continuous variable into categories:\n",
    "\n",
    "* `pyspark.ml.feature.Binarizer`: split a column of continuous features given a threshold\n",
    "* `pyspark.ml.feature.Bucktizer`: split a column of continuous features into categories given several breaking points.\n",
    "    + with n+1 split points, there are n categories (buckets).\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Create some data"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+\n",
      "|                  x1|                x2|\n",
      "+--------------------+------------------+\n",
      "| 0.47143516373249306| 6.834629351721363|\n",
      "| -1.1909756947064645| 7.127020269829002|\n",
      "|  1.4327069684260973|3.7025075479039495|\n",
      "| -0.3126518960917129| 5.611961860656249|\n",
      "| -0.7205887333650116| 5.030831653078097|\n",
      "|  0.8871629403077386|0.1376844959068224|\n",
      "|  0.8595884137174165| 7.728266216123741|\n",
      "| -0.6365235044173491| 8.826411906361166|\n",
      "|0.015696372114428918| 3.648859839013723|\n",
      "| -2.2426849541854055| 6.153961784334937|\n",
      "+--------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "import numpy as np\n",
    "import pandas as pd\n",
    "np.random.seed(seed=1234)\n",
    "pdf = pd.DataFrame({\n",
    "        'x1': np.random.randn(10),\n",
    "        'x2': np.random.rand(10)*10\n",
    "    })\n",
    "np.random.seed(seed=None)\n",
    "df = spark.createDataFrame(pdf)\n",
    "df.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Binarize the column x1 and Bucketize the column x2"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+--------------------+------------------+------+------+\n",
      "|                  x1|                x2|x1_new|x2_new|\n",
      "+--------------------+------------------+------+------+\n",
      "| 0.47143516373249306| 6.834629351721363|   1.0|   2.0|\n",
      "| -1.1909756947064645| 7.127020269829002|   0.0|   2.0|\n",
      "|  1.4327069684260973|3.7025075479039495|   1.0|   1.0|\n",
      "| -0.3126518960917129| 5.611961860656249|   0.0|   2.0|\n",
      "| -0.7205887333650116| 5.030831653078097|   0.0|   2.0|\n",
      "|  0.8871629403077386|0.1376844959068224|   1.0|   0.0|\n",
      "|  0.8595884137174165| 7.728266216123741|   1.0|   3.0|\n",
      "| -0.6365235044173491| 8.826411906361166|   0.0|   3.0|\n",
      "|0.015696372114428918| 3.648859839013723|   1.0|   1.0|\n",
      "| -2.2426849541854055| 6.153961784334937|   0.0|   2.0|\n",
      "+--------------------+------------------+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "from pyspark.ml.feature import Binarizer, Bucketizer\n",
    "# threshold = 0 for binarizer\n",
    "binarizer = Binarizer(threshold=0, inputCol='x1', outputCol='x1_new')\n",
    "# provide 5 split points to generate 4 buckets\n",
    "bucketizer = Bucketizer(splits=[0, 2.5, 5, 7.5, 10], inputCol='x2', outputCol='x2_new')\n",
    "\n",
    "# pipeline stages\n",
    "from pyspark.ml import Pipeline\n",
    "stages = [binarizer, bucketizer]\n",
    "pipeline = Pipeline(stages=stages)\n",
    "\n",
    "# fit the pipeline model and transform the data\n",
    "pipeline.fit(df).transform(df).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
